-
Notifications
You must be signed in to change notification settings - Fork 688
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
1 parent
fb67dce
commit 8086923
Showing
2 changed files
with
59 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,16 +5,16 @@ summary: Learn the concept of TiCDC Avro Protocol and how to use it. | |
|
||
# TiCDC Avro Protocol | ||
|
||
Avro is a data exchange format protocol defined by [Apache Avro™](https://avro.apache.org/) and chosen by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html) as the default data exchange format. This document describes the implementation of the Avro data format in TiCDC, including TiDB extension fields, definition of the Avro data format, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). | ||
TiCDC Avro protocol is a third-party implementation of the [Confluent Avro](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html) data exchange protocol defined by [Confluent Platform](https://docs.confluent.io/platform/current/platform.html). Avro is a data format defined by [Apache Avro™](https://avro.apache.org/) | ||
|
||
This document describes how TiCDC implements the Confluent Avro protocol, and the interaction between Avro and [Confluent Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html). | ||
|
||
> **Warning:** | ||
> | ||
> Starting from v7.3.0, if you enable TiCDC to [replicate tables without a valid index](/ticdc/ticdc-manage-changefeed.md#replicate-tables-without-a-valid-index), TiCDC will report an error when you create a changefeed that uses the Avro protocol. | ||
## Use Avro | ||
|
||
When using Message Queue (MQ) as a downstream sink, you can specify Avro in `sink-uri`. TiCDC captures TiDB DML events, creates Avro messages from these events, and sends the messages downstream. When Avro detects a schema change, it registers the latest schema with Schema Registry. | ||
|
||
The following is a configuration example using Avro: | ||
|
||
{{< copyable "shell-regular" >}} | ||
|
@@ -23,30 +23,11 @@ The following is a configuration example using Avro: | |
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml | ||
``` | ||
|
||
```shell | ||
[sink] | ||
dispatchers = [ | ||
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, | ||
] | ||
``` | ||
|
||
The value of `--schema-registry` supports the `https` protocol and `username:password` authentication, for example, `--schema-registry=https://username:[email protected]`. The username and password must be URL-encoded. | ||
The value of `--schema-registry` supports the `https` protocol and `username:password` authentication. The username and password must be URL-encoded. For example, `--schema-registry=https://username:[email protected]`. | ||
|
||
## TiDB extension fields | ||
|
||
By default, Avro only collects data of changed rows in DML events and does not collect the type of data changes or TiDB-specific CommitTS (the unique identifiers of transactions). To address this issue, TiCDC introduces the following three TiDB extension fields to the Avro protocol message. When `enable-tidb-extension` is set to `true` (`false` by default) in `sink-uri`, TiCDC adds these three fields to the Avro messages during message generation. | ||
|
||
- `_tidb_op`: The DML type. "c" indicates insert and "u" indicates updates. | ||
- `_tidb_commit_ts`: The unique identifier of a transaction. | ||
- `_tidb_commit_physical_time`: The physical timestamp in a transaction identifier. | ||
|
||
The following is a configuration example: | ||
|
||
{{< copyable "shell-regular" >}} | ||
|
||
```shell | ||
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml | ||
``` | ||
> **Note:** | ||
> | ||
> When using the Avro protocol, one Kafka topic can only contain data for one table. You need to configure the [Topic dispatcher](/ticdc/ticdc-sink-to-kafka.md#topic-dispatchers) in the configuration file. | ||
```shell | ||
[sink] | ||
|
@@ -93,7 +74,36 @@ The `fields` in the key contains only primary key columns or unique index column | |
} | ||
``` | ||
|
||
The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns, not just the primary key columns. | ||
The data format of Value is the same as that of Key, by default. However, `fields` in the Value contains all columns. | ||
|
||
> **Note:** | ||
> | ||
> The Avro protocol encodes DML events as follows: | ||
> | ||
> - For Delete events, Avro only encodes the Key part. The Value part is empty. | ||
> - For Insert events, Avro encodes all column data to the Value part. | ||
> - For Update events, Avro encodes only all column data that is updated to the Value part. | ||
## TiDB extension fields | ||
|
||
By default, Avro only collects data of changed rows in DML events and does not collect the type of data changes or TiDB-specific CommitTS (the unique identifiers of transactions). To address this issue, TiCDC introduces the following three TiDB extension fields to the Avro protocol message. When `enable-tidb-extension` is set to `true` (`false` by default) in `sink-uri`, TiCDC adds these three fields to the Avro messages during message generation. | ||
|
||
- `_tidb_op`: The DML type. "c" indicates insert and "u" indicates updates. | ||
- `_tidb_commit_ts`: The unique identifier of a transaction. | ||
- `_tidb_commit_physical_time`: The physical timestamp in a transaction identifier. | ||
|
||
The following is a configuration example: | ||
|
||
```shell | ||
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml | ||
``` | ||
|
||
```shell | ||
[sink] | ||
dispatchers = [ | ||
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, | ||
] | ||
``` | ||
|
||
After you enable [`enable-tidb-extension`](#tidb-extension-fields), the data format of the Value will be as follows: | ||
|
||
|
@@ -159,7 +169,7 @@ If one column can be NULL, the Column data format can be: | |
|
||
- `{{ColumnName}}` indicates the column name. | ||
- `{{TIDB_TYPE}}` indicates the type in TiDB, which is not a one-to-one mapping with the SQL type. | ||
- `{{AVRO_TYPE}}` indicates the type in [avro spec](https://avro.apache.org/docs/current/spec.html). | ||
- `{{AVRO_TYPE}}` indicates the type in the [Avro Specification](https://avro.apache.org/docs/++version++/specification). | ||
|
||
| SQL TYPE | TIDB_TYPE | AVRO_TYPE | Description | | ||
|------------|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------| | ||
|
@@ -272,14 +282,27 @@ DECIMAL(10, 4) | |
|
||
## DDL events and schema changes | ||
|
||
Avro does not generate DDL events downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues. | ||
Avro does not send DDL events and Watermark events to downstream. It checks whether a schema changes each time a DML event occurs. If a schema changes, Avro generates a new schema and registers it with the Schema Registry. If the schema change does not pass the compatibility check, the registration fails. TiCDC does not resolve any schema compatibility issues. | ||
|
||
Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to perform an upgrade to ensure normal running of the system. | ||
For example, the default [compatibility policy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html#compatibility-types) of Confluent Schema Registry is `BACKWARD`, and you add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. | ||
|
||
Assume that the default compatibility policy of Confluent Schema Registry is `BACKWARD` and add a non-empty column to the source table. In this situation, Avro generates a new schema but fails to register it with Schema Registry due to compatibility issues. At this time, the changefeed enters an error state. | ||
Note that, even if a schema change passes the compatibility check and a new version is registered, the data producers and consumers still need to acquire the new schema for data encoding and decoding. | ||
|
||
For more information about schemas, refer to [Schema Registry related documents](https://docs.confluent.io/platform/current/schema-registry/avro.html). | ||
|
||
## Consumer implementation | ||
|
||
The TiCDC Avro protocol can be deserialized by [`io.confluent.kafka.serializers.KafkaAvroDeserializer`](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-avro.html#avro-deserializer). | ||
|
||
The consumer program can acquire the latest schema via [Schema Registry API](https://docs.confluent.io/platform/current/schema-registry/develop/api.html), and then use the schema to deserialize the data encoded by the TiCDC Avro protocol. | ||
|
||
### Distinguish event types | ||
|
||
The consumer program can distinguish DML event types by the following rules: | ||
|
||
* If there is only the Key part, it is a Delete event. | ||
* If there are both Key and Value parts, it is either an Insert or an Update event. If the [TiDB extension fields](#tidb-extension-fields) are enabled, you can use the `_tidb_op` field to identify if it is an Insert or Update event. If the TiDB extension fields are not enabled, you cannot distinguish them. | ||
|
||
## Topic distribution | ||
|
||
Schema Registry supports three [Subject Name Strategies](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy): TopicNameStrategy, RecordNameStrategy, and TopicRecordNameStrategy. Currently, TiCDC Avro only supports TopicNameStrategy, which means that a Kafka topic can only receive data in one data format. Therefore, TiCDC Avro prohibits mapping multiple tables to the same topic. When you create a changefeed, an error will be reported if the topic rule does not include the `{schema}` and `{table}` placeholders in the configured distribution rule. | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters