From c5084ecddccafd6380efe87f6a5994e7996766fd Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 17 Oct 2024 15:41:58 +0800 Subject: [PATCH 1/7] add more to avro --- ticdc/ticdc-avro-protocol.md | 78 ++++++++++++++++++++++-------------- ticdc/ticdc-sink-to-kafka.md | 14 +++++-- 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index b616631729b6c..53e73f2cd6de3 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -5,7 +5,9 @@ summary: Learn the concept of TiCDC Avro Protocol and how to use it. # TiCDC Avro Protocol -Avro is a data exchange format protocol defined by [Apache Avro™](https://avro.apache.org/) and chosen by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html) as the default data exchange format. This document describes the implementation of the Avro data format in TiCDC, including TiDB extension fields, definition of the Avro data format, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). +TiCDC Avro protocol is a third part implementation of the [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) data exchange protocol which is defined by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html). Avro is a data format defined by [Apache Avro™](https://avro.apache.org/) + +This document describes how the TiCDC implement the Confluent Avro protocol, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). > **Warning:** > @@ -13,8 +15,6 @@ Avro is a data exchange format protocol defined by [Apache Avro™](https://avro ## Use Avro -When using Message Queue (MQ) as a downstream sink, you can specify Avro in `sink-uri`. TiCDC captures TiDB DML events, creates Avro messages from these events, and sends the messages downstream. When Avro detects a schema change, it registers the latest schema with Schema Registry. - The following is a configuration example using Avro: {{< copyable "shell-regular" >}} @@ -32,28 +32,9 @@ dispatchers = [ The value of `--schema-registry` supports the `https` protocol and `username:password` authentication, for example, `--schema-registry=https://username:password@schema-registry-uri.com`. The username and password must be URL-encoded. -## TiDB extension fields - -By default, Avro only collects data of changed rows in DML events and does not collect the type of data changes or TiDB-specific CommitTS (the unique identifiers of transactions). To address this issue, TiCDC introduces the following three TiDB extension fields to the Avro protocol message. When `enable-tidb-extension` is set to `true` (`false` by default) in `sink-uri`, TiCDC adds these three fields to the Avro messages during message generation. - -- `_tidb_op`: The DML type. "c" indicates insert and "u" indicates updates. -- `_tidb_commit_ts`: The unique identifier of a transaction. -- `_tidb_commit_physical_time`: The physical timestamp in a transaction identifier. - -The following is a configuration example: - -{{< copyable "shell-regular" >}} - -```shell -cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml -``` - -```shell -[sink] -dispatchers = [ - {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, -] -``` +> **Note:** +> +> When using the Avro protocol, One Kafka Topic must only have data for one table, should configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the config file. ## Definition of the data format @@ -95,6 +76,32 @@ The `fields` in the key contains only primary key columns or unique index column The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns, not just the primary key columns. +> **Note** +> Avro protocol only encode the Key when encoding the Delete event. For the Insert event, encoding all column data into Value. For Update event, only encoding all columns data after change. + +## TiDB extension fields + +By default, Avro only collects data of changed rows in DML events and does not collect the type of data changes or TiDB-specific CommitTS (the unique identifiers of transactions). To address this issue, TiCDC introduces the following three TiDB extension fields to the Avro protocol message. When `enable-tidb-extension` is set to `true` (`false` by default) in `sink-uri`, TiCDC adds these three fields to the Avro messages during message generation. + +- `_tidb_op`: The DML type. "c" indicates insert and "u" indicates updates. +- `_tidb_commit_ts`: The unique identifier of a transaction. +- `_tidb_commit_physical_time`: The physical timestamp in a transaction identifier. + +The following is a configuration example: + +{{< copyable "shell-regular" >}} + +```shell +cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml +``` + +```shell +[sink] +dispatchers = [ + {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, +] +``` + After you enable [`enable-tidb-extension`](#tidb-extension-fields), the data format of the Value will be as follows: ``` @@ -159,7 +166,7 @@ If one column can be NULL, the Column data format can be: - `{{ColumnName}}` indicates the column name. - `{{TIDB_TYPE}}` indicates the type in TiDB, which is not a one-to-one mapping with the SQL type. -- `{{AVRO_TYPE}}` indicates the type in [avro spec](https://avro.apache.org/docs/current/spec.html). +- `{{AVRO_TYPE}}` indicates the type in [avro spec](https://avro.apache.org/docs/++version++/specification). | SQL TYPE | TIDB_TYPE | AVRO_TYPE | Description | |------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------| @@ -272,14 +279,27 @@ DECIMAL(10, 4) ## DDL events and schema changes -Avro does not generate DDL events downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues. +Avro does not send DDL events and Watermark event to downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues. -Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to perform an upgrade to ensure normal running of the system. +For example, the default [compatibility policy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types) of Confluent Schema Registry is `BACKWARD`. Add a non-empty column to the source table, in this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. -Assume that the default compatibility policy of Confluent Schema Registry is `BACKWARD` and add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. +Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to acquire the schema to support data encoding and decoding. For more information about schemas, refer to [Schema Registry related documents](https://docs.confluent.io/platform/current/schema-registry/avro.html). +## Consumer implementation + +TiCDC Avro protocol support decoding by the [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer). + +The consumer program can acquire the schema by using [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), which can be used to deserialize the TiCDC avro protocol encoded data. + +### Distinguish event type + +Consumer can distinguish DML event type by the following rule: + +* If only have Key part, it is a Delete event. +* If have both Key and Value part, it is a Insert or Update event. If enable the [TiDB extension fields](#tidb-extension-fields), use the `_tidb_op` field to know whether it is Insert or Update, else cannot distinguish these two. + ## Topic distribution Schema Registry supports three [Subject Name Strategies](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy): TopicNameStrategy, RecordNameStrategy, and TopicRecordNameStrategy. Currently, TiCDC Avro only supports TopicNameStrategy, which means that a Kafka topic can only receive data in one data format. Therefore, TiCDC Avro prohibits mapping multiple tables to the same topic. When you create a changefeed, an error will be reported if the topic rule does not include the `{schema}` and `{table}` placeholders in the configured distribution rule. diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 474ef7f0312c2..930135993aea4 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -80,7 +80,7 @@ The following are descriptions of sink URI parameters and values that can be con | `replication-factor` | The number of Kafka message replicas that can be saved (optional, `1` by default). This value must be greater than or equal to the value of [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) in Kafka. | | `required-acks` | A parameter used in the `Produce` request, which notifies the broker of the number of replica acknowledgements it needs to receive before responding. Value options are `0` (`NoResponse`: no response, only `TCP ACK` is provided), `1` (`WaitForLocal`: responds only after local commits are submitted successfully), and `-1` (`WaitForAll`: responds after all replicated replicas are committed successfully. You can configure the minimum number of replicated replicas using the [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) configuration item of the broker). (Optional, the default value is `-1`). | | `compression` | The compression algorithm used when sending messages (value options are `none`, `lz4`, `gzip`, `snappy`, and `zstd`; `none` by default). Note that the Snappy compressed file must be in the [official Snappy format](https://github.com/google/snappy). Other variants of Snappy compression are not supported.| -| `protocol` | The protocol with which messages are output to Kafka. The value options are `canal-json`, `open-protocol`, `avro`, `debezium`, and `simple`. | +| `protocol` | The protocol with which messages are output to Kafka. The value options are [`canal-json`](/ticdc/ticdc-canal-json.md)、[`open-protocol`](/ticdc/ticdc-open-protocol.md)、[`avro`](/ticdc/ticdc-avro-protocol.md)、[`debezium`](/ticdc/ticdc-debezium.md) and [`simple`](/ticdc/ticdc-simple-protocol.md). | | `auto-create-topic` | Determines whether TiCDC creates the topic automatically when the `topic-name` passed in does not exist in the Kafka cluster (optional, `true` by default). | | `enable-tidb-extension` | Optional. `false` by default. When the output protocol is `canal-json`, if the value is `true`, TiCDC sends [WATERMARK events](/ticdc/ticdc-canal-json.md#watermark-event) and adds the [TiDB extension field](/ticdc/ticdc-canal-json.md#tidb-extension-field) to Kafka messages. From v6.1.0, this parameter is also applicable to the `avro` protocol. If the value is `true`, TiCDC adds [three TiDB extension fields](/ticdc/ticdc-avro-protocol.md#tidb-extension-fields) to the Kafka message. | | `max-batch-size` | New in v4.0.9. If the message protocol supports outputting multiple data changes to one Kafka message, this parameter specifies the maximum number of data changes in one Kafka message. It currently takes effect only when Kafka's `protocol` is `open-protocol` (optional, `16` by default). | @@ -114,7 +114,13 @@ The following are descriptions of sink URI parameters and values that can be con > **Note:** > -> When `protocol` is `open-protocol`, TiCDC tries to avoid generating messages that exceed `max-message-bytes` in length. However, if a row is so large that a single change alone exceeds `max-message-bytes` in length, to avoid silent failure, TiCDC tries to output this message and prints a warning in the log. +> When `protocol` is `open-protocol`, TiCDC tries to batch multiple encoded messages into one Kafka message, avoid generating messages that exceed `max-message-bytes` in length. +> If the encoding result of a single row changed event is larger than the `max-message-bytes` in length, changefeed report error, and print logs. + +> **注意:** +> +> 当 `protocol` 为 `open-protocol` 时,TiCDC 会将多个事件编码到同一个 Kafka 消息中,此过程中尽量避免产生长度超过 `max-message-bytes` 的消息。 +> 如果单条数据变更编码得到的消息大小超过了 `max-message-bytes` 个字节,changefeed 会报错,打印错误日志。 ### TiCDC uses the authentication and authorization of Kafka @@ -172,7 +178,7 @@ The following are examples when using Kafka SASL authentication: ### Integrate TiCDC with Kafka Connect (Confluent Platform) -To use the [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) provided by Confluent to stream data to relational or non-relational databases, you need to use the `avro` protocol and provide a URL for [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) in `schema-registry`. +To use the [data connectors](https://docs.confluent.io/current/connect/managing/connectors.html) provided by Confluent to stream data to relational or non-relational databases, you need to use the [`avro` protocol](/ticdc/ticdc-avro-protocol.md) and provide a URL for [Confluent Schema Registry](https://www.confluent.io/product/confluent-platform/data-compatibility/) in `schema-registry`. Sample configuration: @@ -191,7 +197,7 @@ For detailed integration guide, see [Quick Start Guide on Integrating TiDB with ### Integrate TiCDC with AWS Glue Schema Registry -Starting from v7.4.0, TiCDC supports using the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) as the Schema Registry when users choose the Avro protocol for data replication. The configuration example is as follows: +Starting from v7.4.0, TiCDC supports using the [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) as the Schema Registry when users choose the [Avro protocol](/ticdc/ticdc-avro-protocol.md) for data replication. The configuration example is as follows: ```shell ./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml From a6990bbff46cd3097eaa11a6d653b6d043e9c034 Mon Sep 17 00:00:00 2001 From: xixirangrang Date: Mon, 21 Oct 2024 10:13:40 +0800 Subject: [PATCH 2/7] Apply suggestions from code review --- ticdc/ticdc-avro-protocol.md | 32 +++++++++++++++++--------------- ticdc/ticdc-sink-to-kafka.md | 9 ++------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index 53e73f2cd6de3..4b926b048916d 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -7,7 +7,7 @@ summary: Learn the concept of TiCDC Avro Protocol and how to use it. TiCDC Avro protocol is a third part implementation of the [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) data exchange protocol which is defined by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html). Avro is a data format defined by [Apache Avro™](https://avro.apache.org/) -This document describes how the TiCDC implement the Confluent Avro protocol, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). +This document describes how TiCDC implements the Confluent Avro protocol, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). > **Warning:** > @@ -34,7 +34,7 @@ The value of `--schema-registry` supports the `https` protocol and `username:pas > **Note:** > -> When using the Avro protocol, One Kafka Topic must only have data for one table, should configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the config file. +> When using the Avro protocol, one Kafka Topic can only have data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file. ## Definition of the data format @@ -76,8 +76,12 @@ The `fields` in the key contains only primary key columns or unique index column The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns, not just the primary key columns. -> **Note** -> Avro protocol only encode the Key when encoding the Delete event. For the Insert event, encoding all column data into Value. For Update event, only encoding all columns data after change. +> **Note:** +> +> The Avro protocol encodes DML events as follows: +> - For Delete events, only encodes the Key part. The Value part is empty. +> - For Insert events, encodes all column data to the Value part. +> - For Update events, encodes only all column data that is updated. ## TiDB extension fields @@ -89,8 +93,6 @@ By default, Avro only collects data of changed rows in DML events and does not c The following is a configuration example: -{{< copyable "shell-regular" >}} - ```shell cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml ``` @@ -166,7 +168,7 @@ If one column can be NULL, the Column data format can be: - `{{ColumnName}}` indicates the column name. - `{{TIDB_TYPE}}` indicates the type in TiDB, which is not a one-to-one mapping with the SQL type. -- `{{AVRO_TYPE}}` indicates the type in [avro spec](https://avro.apache.org/docs/++version++/specification). +- `{{AVRO_TYPE}}` indicates the type in the [Avro Specification](https://avro.apache.org/docs/++version++/specification). | SQL TYPE | TIDB_TYPE | AVRO_TYPE | Description | |------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------| @@ -279,9 +281,9 @@ DECIMAL(10, 4) ## DDL events and schema changes -Avro does not send DDL events and Watermark event to downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues. +Avro does not send DDL events and Watermark events to downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues. -For example, the default [compatibility policy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types) of Confluent Schema Registry is `BACKWARD`. Add a non-empty column to the source table, in this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. +For example, the default [compatibility policy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types) of Confluent Schema Registry is `BACKWARD`, and you add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to acquire the schema to support data encoding and decoding. @@ -289,16 +291,16 @@ For more information about schemas, refer to [Schema Registry related documents] ## Consumer implementation -TiCDC Avro protocol support decoding by the [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer). +The TiCDC Avro protocol can be deserialized by [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer). -The consumer program can acquire the schema by using [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), which can be used to deserialize the TiCDC avro protocol encoded data. +The consumer program can acquire the schema by using [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), and then uses it to deserialize the data encoded by the TiCDC Avro protocol. -### Distinguish event type +### Distinguish event types -Consumer can distinguish DML event type by the following rule: +The consumer program can distinguish DML event types by the following rules: -* If only have Key part, it is a Delete event. -* If have both Key and Value part, it is a Insert or Update event. If enable the [TiDB extension fields](#tidb-extension-fields), use the `_tidb_op` field to know whether it is Insert or Update, else cannot distinguish these two. +* If there is only the Key part, then it is a Delete event. +* If there are both Key and Value parts, then it is either an Insert or an Update event. If the [TiDB extension fields](#tidb-extension-fields) are enabled, you can use the `_tidb_op` field to identify if it is an Insert or Update event. If the TiDB extension fields are not enabled, you cannot distinguish them. ## Topic distribution diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 930135993aea4..6e42a50bdd6af 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -114,13 +114,8 @@ The following are descriptions of sink URI parameters and values that can be con > **Note:** > -> When `protocol` is `open-protocol`, TiCDC tries to batch multiple encoded messages into one Kafka message, avoid generating messages that exceed `max-message-bytes` in length. -> If the encoding result of a single row changed event is larger than the `max-message-bytes` in length, changefeed report error, and print logs. - -> **注意:** -> -> 当 `protocol` 为 `open-protocol` 时,TiCDC 会将多个事件编码到同一个 Kafka 消息中,此过程中尽量避免产生长度超过 `max-message-bytes` 的消息。 -> 如果单条数据变更编码得到的消息大小超过了 `max-message-bytes` 个字节,changefeed 会报错,打印错误日志。 +> When `protocol` is `open-protocol`, TiCDC encodes multiple events into one Kafka message. Avoid generating messages that exceed the length specified by `max-message-bytes`. +> If the encoded result of a single row change event exceeds the value of `max-message-bytes`, the changefeed reports an error, and prints logs. ### TiCDC uses the authentication and authorization of Kafka From f31bc13c9eb6d6fe374ad68e268434dc41107989 Mon Sep 17 00:00:00 2001 From: xixirangrang Date: Mon, 21 Oct 2024 10:14:58 +0800 Subject: [PATCH 3/7] Update ticdc-avro-protocol.md --- ticdc/ticdc-avro-protocol.md | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index 4b926b048916d..06dd41b9c1135 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -22,6 +22,11 @@ The following is a configuration example using Avro: ```shell cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml ``` +The value of `--schema-registry` supports the `https` protocol and `username:password` authentication, for example, `--schema-registry=https://username:password@schema-registry-uri.com`. The username and password must be URL-encoded. + +> **Note:** +> +> When using the Avro protocol, one Kafka Topic can only have data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file. ```shell [sink] @@ -30,12 +35,6 @@ dispatchers = [ ] ``` -The value of `--schema-registry` supports the `https` protocol and `username:password` authentication, for example, `--schema-registry=https://username:password@schema-registry-uri.com`. The username and password must be URL-encoded. - -> **Note:** -> -> When using the Avro protocol, one Kafka Topic can only have data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file. - ## Definition of the data format TiCDC converts a DML event into a Kafka event, and the Key and Value of an event are encoded according to the Avro protocol. From 125d5338e7b29822c40558a17f5ec6053b2a0423 Mon Sep 17 00:00:00 2001 From: xixirangrang Date: Tue, 29 Oct 2024 11:17:30 +0800 Subject: [PATCH 4/7] Update ticdc/ticdc-avro-protocol.md --- ticdc/ticdc-avro-protocol.md | 1 + 1 file changed, 1 insertion(+) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index 06dd41b9c1135..4a72152f7e9d5 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -78,6 +78,7 @@ The data format of Value is the same as that of Key, by default. However, `field > **Note:** > > The Avro protocol encodes DML events as follows: +> > - For Delete events, only encodes the Key part. The Value part is empty. > - For Insert events, encodes all column data to the Value part. > - For Update events, encodes only all column data that is updated. From b0ed30fc945139e421336b469f9d8dbadb9e4ab3 Mon Sep 17 00:00:00 2001 From: xixirangrang Date: Fri, 1 Nov 2024 20:23:02 +0800 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Grace Cai --- ticdc/ticdc-avro-protocol.md | 22 +++++++++++----------- ticdc/ticdc-sink-to-kafka.md | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index 4a72152f7e9d5..391320ed1085c 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -5,7 +5,7 @@ summary: Learn the concept of TiCDC Avro Protocol and how to use it. # TiCDC Avro Protocol -TiCDC Avro protocol is a third part implementation of the [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) data exchange protocol which is defined by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html). Avro is a data format defined by [Apache Avro™](https://avro.apache.org/) +TiCDC Avro protocol is a third-party implementation of the [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) data exchange protocol defined by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html). Avro is a data format defined by [Apache Avro™](https://avro.apache.org/) This document describes how TiCDC implements the Confluent Avro protocol, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). @@ -22,11 +22,11 @@ The following is a configuration example using Avro: ```shell cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml ``` -The value of `--schema-registry` supports the `https` protocol and `username:password` authentication, for example, `--schema-registry=https://username:password@schema-registry-uri.com`. The username and password must be URL-encoded. +The value of `--schema-registry` supports the `https` protocol and `username:password` authentication. The username and password must be URL-encoded. For example, `--schema-registry=https://username:password@schema-registry-uri.com`. > **Note:** > -> When using the Avro protocol, one Kafka Topic can only have data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file. +> When using the Avro protocol, one Kafka topic can only contain data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file. ```shell [sink] @@ -73,15 +73,15 @@ The `fields` in the key contains only primary key columns or unique index column } ``` -The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns, not just the primary key columns. +The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns. > **Note:** > > The Avro protocol encodes DML events as follows: > -> - For Delete events, only encodes the Key part. The Value part is empty. -> - For Insert events, encodes all column data to the Value part. -> - For Update events, encodes only all column data that is updated. +> - For Delete events, Avro only encodes the Key part. The Value part is empty. +> - For Insert events, Avro encodes all column data to the Value part. +> - For Update events, Avro encodes only all column data that is updated to the Value part. ## TiDB extension fields @@ -285,7 +285,7 @@ Avro does not send DDL events and Watermark events to downstream. It checks whet For example, the default [compatibility policy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types) of Confluent Schema Registry is `BACKWARD`, and you add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. -Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to acquire the schema to support data encoding and decoding. +Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to acquire the new schema for data encoding and decoding. For more information about schemas, refer to [Schema Registry related documents](https://docs.confluent.io/platform/current/schema-registry/avro.html). @@ -293,14 +293,14 @@ For more information about schemas, refer to [Schema Registry related documents] The TiCDC Avro protocol can be deserialized by [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer). -The consumer program can acquire the schema by using [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), and then uses it to deserialize the data encoded by the TiCDC Avro protocol. +The consumer program can acquire the latest schema via [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), and then use the schema to deserialize the data encoded by the TiCDC Avro protocol. ### Distinguish event types The consumer program can distinguish DML event types by the following rules: -* If there is only the Key part, then it is a Delete event. -* If there are both Key and Value parts, then it is either an Insert or an Update event. If the [TiDB extension fields](#tidb-extension-fields) are enabled, you can use the `_tidb_op` field to identify if it is an Insert or Update event. If the TiDB extension fields are not enabled, you cannot distinguish them. +* If there is only the Key part, it is a Delete event. +* If there are both Key and Value parts, it is either an Insert or an Update event. If the [TiDB extension fields](#tidb-extension-fields) are enabled, you can use the `_tidb_op` field to identify if it is an Insert or Update event. If the TiDB extension fields are not enabled, you cannot distinguish them. ## Topic distribution diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 6e42a50bdd6af..5fc66b79b85f0 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -114,8 +114,8 @@ The following are descriptions of sink URI parameters and values that can be con > **Note:** > -> When `protocol` is `open-protocol`, TiCDC encodes multiple events into one Kafka message. Avoid generating messages that exceed the length specified by `max-message-bytes`. -> If the encoded result of a single row change event exceeds the value of `max-message-bytes`, the changefeed reports an error, and prints logs. +> When `protocol` is `open-protocol`, TiCDC encodes multiple events into one Kafka message and avoids generating messages that exceed the length specified by `max-message-bytes`. +> If the encoded result of a single row change event exceeds the value of `max-message-bytes`, the changefeed reports an error and prints logs. ### TiCDC uses the authentication and authorization of Kafka From 88aed18aff5b1750c70430d83d98f126cd687075 Mon Sep 17 00:00:00 2001 From: xixirangrang Date: Fri, 1 Nov 2024 20:29:21 +0800 Subject: [PATCH 6/7] Update ticdc-sink-to-kafka.md --- ticdc/ticdc-sink-to-kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ticdc/ticdc-sink-to-kafka.md b/ticdc/ticdc-sink-to-kafka.md index 5fc66b79b85f0..f8238af01aeac 100644 --- a/ticdc/ticdc-sink-to-kafka.md +++ b/ticdc/ticdc-sink-to-kafka.md @@ -73,6 +73,7 @@ The following are descriptions of sink URI parameters and values that can be con | `127.0.0.1` | The IP address of the downstream Kafka services. | | `9092` | The port for the downstream Kafka. | | `topic-name` | Variable. The name of the Kafka topic. | +| `protocol` | The protocol with which messages are output to Kafka. The value options are [`canal-json`](/ticdc/ticdc-canal-json.md)、[`open-protocol`](/ticdc/ticdc-open-protocol.md)、[`avro`](/ticdc/ticdc-avro-protocol.md)、[`debezium`](/ticdc/ticdc-debezium.md) and [`simple`](/ticdc/ticdc-simple-protocol.md). | | `kafka-version` | The version of the downstream Kafka. This value needs to be consistent with the actual version of the downstream Kafka. | | `kafka-client-id` | Specifies the Kafka client ID of the replication task (optional. `TiCDC_sarama_producer_replication ID` by default). | | `partition-num` | The number of the downstream Kafka partitions (optional. The value must be **no greater than** the actual number of partitions; otherwise, the replication task cannot be created successfully. `3` by default). | @@ -80,7 +81,6 @@ The following are descriptions of sink URI parameters and values that can be con | `replication-factor` | The number of Kafka message replicas that can be saved (optional, `1` by default). This value must be greater than or equal to the value of [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) in Kafka. | | `required-acks` | A parameter used in the `Produce` request, which notifies the broker of the number of replica acknowledgements it needs to receive before responding. Value options are `0` (`NoResponse`: no response, only `TCP ACK` is provided), `1` (`WaitForLocal`: responds only after local commits are submitted successfully), and `-1` (`WaitForAll`: responds after all replicated replicas are committed successfully. You can configure the minimum number of replicated replicas using the [`min.insync.replicas`](https://kafka.apache.org/33/documentation.html#brokerconfigs_min.insync.replicas) configuration item of the broker). (Optional, the default value is `-1`). | | `compression` | The compression algorithm used when sending messages (value options are `none`, `lz4`, `gzip`, `snappy`, and `zstd`; `none` by default). Note that the Snappy compressed file must be in the [official Snappy format](https://github.com/google/snappy). Other variants of Snappy compression are not supported.| -| `protocol` | The protocol with which messages are output to Kafka. The value options are [`canal-json`](/ticdc/ticdc-canal-json.md)、[`open-protocol`](/ticdc/ticdc-open-protocol.md)、[`avro`](/ticdc/ticdc-avro-protocol.md)、[`debezium`](/ticdc/ticdc-debezium.md) and [`simple`](/ticdc/ticdc-simple-protocol.md). | | `auto-create-topic` | Determines whether TiCDC creates the topic automatically when the `topic-name` passed in does not exist in the Kafka cluster (optional, `true` by default). | | `enable-tidb-extension` | Optional. `false` by default. When the output protocol is `canal-json`, if the value is `true`, TiCDC sends [WATERMARK events](/ticdc/ticdc-canal-json.md#watermark-event) and adds the [TiDB extension field](/ticdc/ticdc-canal-json.md#tidb-extension-field) to Kafka messages. From v6.1.0, this parameter is also applicable to the `avro` protocol. If the value is `true`, TiCDC adds [three TiDB extension fields](/ticdc/ticdc-avro-protocol.md#tidb-extension-fields) to the Kafka message. | | `max-batch-size` | New in v4.0.9. If the message protocol supports outputting multiple data changes to one Kafka message, this parameter specifies the maximum number of data changes in one Kafka message. It currently takes effect only when Kafka's `protocol` is `open-protocol` (optional, `16` by default). | From b810d33b1f91cc52c58b567750ffb74d79ed1bf0 Mon Sep 17 00:00:00 2001 From: xixirangrang Date: Mon, 4 Nov 2024 09:09:15 +0800 Subject: [PATCH 7/7] Update ticdc/ticdc-avro-protocol.md --- ticdc/ticdc-avro-protocol.md | 1 + 1 file changed, 1 insertion(+) diff --git a/ticdc/ticdc-avro-protocol.md b/ticdc/ticdc-avro-protocol.md index 391320ed1085c..4410b37becea9 100644 --- a/ticdc/ticdc-avro-protocol.md +++ b/ticdc/ticdc-avro-protocol.md @@ -22,6 +22,7 @@ The following is a configuration example using Avro: ```shell cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml ``` + The value of `--schema-registry` supports the `https` protocol and `username:password` authentication. The username and password must be URL-encoded. For example, `--schema-registry=https://username:password@schema-registry-uri.com`. > **Note:**